# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

tbl <- tibble::tibble(
  x = 1:10,
  y = letters[1:10]
)
batch <- record_batch(tbl)
tab <- Table$create(tbl)

test_that("RecordBatchStreamReader / Writer", {
  sink <- BufferOutputStream$create()
  expect_equal(sink$tell(), 0)
  writer <- RecordBatchStreamWriter$create(sink, batch$schema)
  expect_r6_class(writer, "RecordBatchWriter")
  writer$write(batch)
  writer$write(tab)
  writer$write(tbl)
  expect_true(sink$tell() > 0)
  writer$close()

  buf <- sink$finish()
  expect_r6_class(buf, "Buffer")

  reader <- RecordBatchStreamReader$create(buf)
  expect_r6_class(reader, "RecordBatchStreamReader")

  batch1 <- reader$read_next_batch()
  expect_r6_class(batch1, "RecordBatch")
  expect_equal(batch, batch1)
  batch2 <- reader$read_next_batch()
  expect_r6_class(batch2, "RecordBatch")
  expect_equal(batch, batch2)
  batch3 <- reader$read_next_batch()
  expect_r6_class(batch3, "RecordBatch")
  expect_equal(batch, batch3)
  expect_null(reader$read_next_batch())
})

test_that("RecordBatchFileReader / Writer", {
  sink <- BufferOutputStream$create()
  writer <- RecordBatchFileWriter$create(sink, batch$schema)
  expect_r6_class(writer, "RecordBatchWriter")
  writer$write(batch)
  writer$write(tab)
  writer$write(tbl)
  writer$close()

  buf <- sink$finish()
  expect_r6_class(buf, "Buffer")

  reader <- RecordBatchFileReader$create(buf)
  expect_r6_class(reader, "RecordBatchFileReader")

  batch1 <- reader$get_batch(0)
  expect_r6_class(batch1, "RecordBatch")
  expect_equal(batch, batch1)

  expect_equal(reader$num_record_batches, 3)
})

test_that("StreamReader read_table", {
  sink <- BufferOutputStream$create()
  writer <- RecordBatchStreamWriter$create(sink, batch$schema)
  expect_r6_class(writer, "RecordBatchWriter")
  writer$write(batch)
  writer$write(tab)
  writer$write(tbl)
  writer$close()
  buf <- sink$finish()

  reader <- RecordBatchStreamReader$create(buf)
  out <- reader$read_table()
  expect_identical(dim(out), c(30L, 2L))
})

test_that("FileReader read_table", {
  sink <- BufferOutputStream$create()
  writer <- RecordBatchFileWriter$create(sink, batch$schema)
  expect_r6_class(writer, "RecordBatchWriter")
  writer$write(batch)
  writer$write(tab)
  writer$write(tbl)
  writer$close()
  buf <- sink$finish()

  reader <- RecordBatchFileReader$create(buf)
  out <- reader$read_table()
  expect_identical(dim(out), c(30L, 2L))
})

test_that("MetadataFormat", {
  expect_identical(get_ipc_metadata_version(5), 4L)
  expect_identical(get_ipc_metadata_version("V4"), 3L)
  expect_identical(get_ipc_metadata_version(NULL), 4L)
  Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1)
  expect_identical(get_ipc_metadata_version(NULL), 3L)
  Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "")

  expect_identical(get_ipc_metadata_version(NULL), 4L)
  Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1)
  expect_identical(get_ipc_metadata_version(NULL), 3L)
  Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = "")

  expect_error(
    get_ipc_metadata_version(99),
    "99 is not a valid IPC MetadataVersion"
  )
  expect_error(
    get_ipc_metadata_version("45"),
    '"45" is not a valid IPC MetadataVersion'
  )
})

test_that("reader with 0 batches", {
  # IPC stream containing only a schema (ARROW-10642)
  sink <- BufferOutputStream$create()
  writer <- RecordBatchStreamWriter$create(sink, schema(a = int32()))
  writer$close()
  buf <- sink$finish()

  reader <- RecordBatchStreamReader$create(buf)
  tab <- reader$read_table()
  expect_r6_class(tab, "Table")
  expect_identical(dim(tab), c(0L, 1L))
})

test_that("reader head method edge cases", {
  batch <- record_batch(
    x = 1:10,
    y = letters[1:10]
  )
  sink <- BufferOutputStream$create()
  writer <- RecordBatchStreamWriter$create(sink, batch$schema)
  writer$write(batch)
  writer$write(batch)
  writer$close()
  buf <- sink$finish()

  reader <- RecordBatchStreamReader$create(buf)
  expect_error(head(reader, -1)) # Not (yet) supported
  expect_equal(
    Table$create(head(reader, 0)),
    Table$create(x = integer(0), y = character(0))
  )
  expect_equal(
    Table$create(head(reader, 100)),
    Table$create(batch, batch)
  )
})

test_that("RBR methods", {
  batch <- record_batch(
    x = 1:10,
    y = letters[1:10]
  )
  sink <- BufferOutputStream$create()
  writer <- RecordBatchStreamWriter$create(sink, batch$schema)
  writer$write(batch)
  writer$write(batch)
  writer$close()
  buf <- sink$finish()

  reader <- RecordBatchStreamReader$create(buf)
  expect_output(
    print(reader),
    "RecordBatchStreamReader
2 columns
x: int32
y: string"
  )
  expect_named(reader, c("x", "y"))
  expect_identical(dim(reader), c(NA_integer_, 2L))

  expect_equal(
    as.data.frame(reader),
    rbind(as.data.frame(batch), as.data.frame(batch))
  )
})

test_that("as_record_batch_reader() works for RecordBatchReader", {
  skip_if_not_available("dataset")

  batch <- record_batch(a = 1, b = "two")
  reader <- Scanner$create(batch)$ToRecordBatchReader()
  expect_identical(as_record_batch_reader(reader), reader)
})

test_that("as_record_batch_reader() works for Scanner", {
  skip_if_not_available("dataset")

  batch <- record_batch(a = 1, b = "two")
  scanner <- Scanner$create(batch)
  reader <- as_record_batch_reader(scanner)
  expect_equal(reader$read_next_batch(), batch)
})

test_that("as_record_batch_reader() works for Dataset", {
  skip_if_not_available("dataset")

  dataset <- InMemoryDataset$create(arrow_table(a = 1, b = "two"))
  reader <- as_record_batch_reader(dataset)
  expect_equal(
    reader$read_next_batch(),
    record_batch(a = 1, b = "two")
  )
})

test_that("as_record_batch_reader() works for Table", {
  table <- arrow_table(a = 1, b = "two")
  reader <- as_record_batch_reader(table)
  expect_equal(reader$read_next_batch(), record_batch(a = 1, b = "two"))
})

test_that("as_record_batch_reader() works for RecordBatch", {
  batch <- record_batch(a = 1, b = "two")
  reader <- as_record_batch_reader(batch)
  expect_equal(reader$read_next_batch(), batch)
})

test_that("as_record_batch_reader() works for data.frame", {
  df <- tibble::tibble(a = 1, b = "two")
  reader <- as_record_batch_reader(df)
  expect_equal(reader$read_next_batch(), record_batch(a = 1, b = "two"))
})

test_that("as_record_batch_reader() works for function", {
  batches <- list(
    record_batch(a = 1, b = "two"),
    record_batch(a = 2, b = "three")
  )

  i <- 0
  fun <- function() {
    i <<- i + 1
    if (i > length(batches)) NULL else batches[[i]]
  }

  reader <- as_record_batch_reader(fun, schema = batches[[1]]$schema)
  expect_equal(reader$read_next_batch(), batches[[1]])
  expect_equal(reader$read_next_batch(), batches[[2]])
  expect_null(reader$read_next_batch())

  # check invalid returns
  fun_bad_type <- function() "not a record batch"
  reader <- as_record_batch_reader(fun_bad_type, schema = schema())
  expect_error(
    reader$read_next_batch(),
    "Expected fun\\(\\) to return an arrow::RecordBatch"
  )

  fun_bad_schema <- function() record_batch(a = 1)
  reader <- as_record_batch_reader(fun_bad_schema, schema = schema(a = string()))
  expect_error(
    reader$read_next_batch(),
    "Expected fun\\(\\) to return batch with schema 'a: string'"
  )
})

test_that("as_record_batch_reader() errors on data.frame with NULL names", {
  df <- data.frame(a = 1, b = "two")
  names(df) <- NULL
  expect_error(as_record_batch_reader(df), "Input data frame columns must be named")
})
